package com.tcs.gather.commod.exec.input.impl;

import java.util.Arrays;

import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tcs.gather.commod.exec.input.AbstractInputCommod;
import com.tcs.gather.model.CommandModel;
import com.tcs.gather.model.kafka.KafkaCommandModel;
import com.tcs.queue.ISingleionQueue;
import com.tcs.util.constant.SystemConstants;

public class KafkaInputCommod extends AbstractInputCommod {

	private final Logger logger = LoggerFactory.getLogger(KafkaInputCommod.class);

	private Consumer<String, String> consumer;
	
	public KafkaInputCommod() {};
	
	public KafkaInputCommod(CommandModel commandModel ,ISingleionQueue<String> queue) {
		super(commandModel,queue);
	}

	@Override
	public void execInputCommod(CommandModel commandModel) {
		KafkaCommandModel kafkaCommandModel = (KafkaCommandModel) commandModel;
		consumer = new KafkaConsumer<>(kafkaCommandModel.getConsumerProperties());
		String topic = commandModel.getProperties().getProperty(SystemConstants.GatherConstants.KAFKA_TOPIC);
		if (StringUtils.isBlank(topic)) {
			logger.error("", new NullPointerException("topic 为空!"));
			return;
		}
		consumer.subscribe(Arrays.asList(topic));
		logger.info(" commandModel : {} ", commandModel);
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			if (records.count() > 0) {
				for (ConsumerRecord<String, String> record : records) {
					record.value();
				}
			}
		}
	}

}
